package com.tpvlog.dfs.client;

import com.tpvlog.dfs.rpc.service.*;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.NegotiationType;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;

/**
 * 与NameNode进行RPC通信的组件
 *
 * @author Ressmix
 */
public class NameNodeRpcClient {

    public static final String NAMENODE_HOSTNAME = "localhost";
    public static final Integer NAMENODE_PORT = 50070;

    private static NameNodeServiceGrpc.NameNodeServiceBlockingStub namenode;

    NameNodeRpcClient() {
        ManagedChannel channel = NettyChannelBuilder.forAddress(NAMENODE_HOSTNAME, NAMENODE_PORT)
                .negotiationType(NegotiationType.PLAINTEXT).build();
        namenode = NameNodeServiceGrpc.newBlockingStub(channel);
    }

    public void mkdir(String path) throws Exception {
        // 调用RPC服务
        MkDirRequest request = MkDirRequest.newBuilder().setPath(path).build();
        MkDirResponse response = namenode.mkdir(request);
        System.out.println("创建目录的响应：" + response.getStatus());
    }

    /**
     * 发送请求到NameNode节点创建文件
     *
     * @param filename
     */
    public Boolean createFile(String filename) {
        CreateFileRequest request = CreateFileRequest.newBuilder().setFilename(filename).build();
        CreateFileResponse response = namenode.createFile(request);
        return response.getStatus() == 1;
    }

    /**
     * 分配双副本DataNode数据节点
     *
     * @param filename
     * @param fileSize
     * @return
     */
    public String allocateDataNodes(String filename, long fileSize) {
        AllocateDataNodesRequest request = AllocateDataNodesRequest.newBuilder()
                .setFilename(filename)
                .setFilesize(fileSize)
                .build();
        AllocateDataNodesResponse response = namenode.allocateDataNodes(request);
        return response.getStatus() == 1 ? response.getDatanodes() : null;
    }

    /**
     * 发送请求获取指定文件所在的随机DataNode节点
     *
     * @param filename
     * @return
     */
    public String getDataNodeForFile(String filename) {
        GetDataNodeForFileRequest request = GetDataNodeForFileRequest.newBuilder()
                .setFilename(filename)
                .build();
        GetDataNodeForFileResponse response = namenode.getDataNodeForFile(request);
        return response.getDatanode();
    }


    public void shutdown() throws Exception {
        ShutdownRequest request = ShutdownRequest.newBuilder().setCode(1).build();
        namenode.shutdown(request);
    }
}
